import json

from unittest.mock import MagicMock, patch

from rest_framework import status

from posthog.constants import AvailableFeature
from posthog.models.dashboard import Dashboard
from posthog.models.feature_flag.feature_flag import FeatureFlag
from posthog.models.organization import OrganizationMembership
from posthog.models.personal_api_key import PersonalAPIKey, hash_key_value
from posthog.models.team.team import Team
from posthog.models.utils import generate_random_token_personal
from posthog.rbac.user_access_control import AccessSource
from posthog.utils import render_template

from products.notebooks.backend.models import Notebook

from ee.api.test.base import APILicensedTest
from ee.models.rbac.role import Role, RoleMembership


class BaseAccessControlTest(APILicensedTest):
    def setUp(self):
        super().setUp()
        self.organization.available_features = [
            AvailableFeature.ADVANCED_PERMISSIONS,
            AvailableFeature.ROLE_BASED_ACCESS,
        ]
        self.organization.save()

    def _put_project_access_control(self, data=None):
        payload = {"access_level": "admin"}

        if data:
            payload.update(data)

        return self.client.put(
            "/api/projects/@current/access_controls",
            payload,
        )

    def _put_global_access_control(self, data=None):
        payload = {"access_level": "editor"}
        if data:
            payload.update(data)

        return self.client.put(
            "/api/projects/@current/resource_access_controls",
            payload,
        )

    def _org_membership(self, level: OrganizationMembership.Level = OrganizationMembership.Level.ADMIN):
        self.organization_membership.level = level
        self.organization_membership.save()


class TestAccessControlProjectLevelAPI(BaseAccessControlTest):
    def test_project_change_rejected_if_not_org_admin(self):
        self._org_membership(OrganizationMembership.Level.MEMBER)
        res = self._put_project_access_control()
        assert res.status_code == status.HTTP_403_FORBIDDEN, res.json()

    def test_project_change_accepted_if_org_admin(self):
        self._org_membership(OrganizationMembership.Level.ADMIN)
        res = self._put_project_access_control()
        assert res.status_code == status.HTTP_200_OK, res.json()

    def test_project_change_accepted_if_org_owner(self):
        self._org_membership(OrganizationMembership.Level.OWNER)
        res = self._put_project_access_control()
        assert res.status_code == status.HTTP_200_OK, res.json()

    def test_project_removed_with_null(self):
        self._org_membership(OrganizationMembership.Level.OWNER)
        res = self._put_project_access_control()
        res = self._put_project_access_control({"access_level": None})
        assert res.status_code == status.HTTP_204_NO_CONTENT

    def test_project_change_if_in_access_control(self):
        self._org_membership(OrganizationMembership.Level.ADMIN)
        # Add ourselves to access
        res = self._put_project_access_control(
            {"organization_member": str(self.organization_membership.id), "access_level": "admin"}
        )
        assert res.status_code == status.HTTP_200_OK, res.json()

        self._org_membership(OrganizationMembership.Level.MEMBER)

        # Now change ourselves to a member
        res = self._put_project_access_control(
            {"organization_member": str(self.organization_membership.id), "access_level": "member"}
        )
        assert res.status_code == status.HTTP_200_OK, res.json()
        assert res.json()["access_level"] == "member"

        # Now try and change our own membership and fail!
        res = self._put_project_access_control(
            {"organization_member": str(self.organization_membership.id), "access_level": "admin"}
        )
        assert res.status_code == status.HTTP_403_FORBIDDEN
        assert res.json()["detail"] == "Must be admin to modify project permissions."

    def test_project_change_rejected_if_not_in_organization(self):
        self.organization_membership.delete()
        res = self._put_project_access_control(
            {"organization_member": str(self.organization_membership.id), "access_level": "admin"}
        )
        assert res.status_code == status.HTTP_404_NOT_FOUND, res.json()

    def test_project_change_rejected_if_bad_access_level(self):
        self._org_membership(OrganizationMembership.Level.ADMIN)
        res = self._put_project_access_control({"access_level": "bad"})
        assert res.status_code == status.HTTP_400_BAD_REQUEST, res.json()
        assert res.json()["detail"] == "Invalid access level. Must be one of: none, member, admin", res.json()


class TestAccessControlMinimumLevelValidation(BaseAccessControlTest):
    def test_action_access_level_cannot_be_below_viewer(self):
        """Test that action access level cannot be set below minimum 'viewer'"""
        self._org_membership(OrganizationMembership.Level.ADMIN)

        from posthog.models.action import Action

        action = Action.objects.create(team=self.team, name="test action")

        res = self.client.put(
            f"/api/projects/@current/actions/{action.id}/access_controls",
            {"access_level": "none"},
        )
        assert res.status_code == status.HTTP_400_BAD_REQUEST, res.json()
        assert "cannot be set below the minimum 'viewer'" in res.json()["detail"]

    def test_action_access_level_accepts_viewer_and_above(self):
        """Test that action access level accepts viewer, editor, and manager"""
        self._org_membership(OrganizationMembership.Level.ADMIN)

        from posthog.models.action import Action

        action = Action.objects.create(team=self.team, name="test action")

        for level in ["viewer", "editor", "manager"]:
            res = self.client.put(
                f"/api/projects/@current/actions/{action.id}/access_controls",
                {"access_level": level},
            )
            assert res.status_code == status.HTTP_200_OK, f"Failed for level {level}: {res.json()}"

    def test_activity_log_access_level_cannot_be_above_viewer(self):
        """Test that activity_log access level cannot be set above maximum 'viewer'"""
        self._org_membership(OrganizationMembership.Level.ADMIN)

        for level in ["editor", "manager"]:
            res = self.client.put(
                "/api/projects/@current/resource_access_controls",
                {"resource": "activity_log", "access_level": level},
            )
            assert res.status_code == status.HTTP_400_BAD_REQUEST, f"Failed for level {level}: {res.json()}"
            assert "cannot be set above the maximum 'viewer'" in res.json()["detail"]

    def test_activity_log_access_restricted_for_users_without_access(self):
        """Test that users without access to activity_log cannot access activity log endpoints"""
        self._org_membership(OrganizationMembership.Level.ADMIN)

        res = self.client.put(
            "/api/projects/@current/resource_access_controls",
            {"resource": "activity_log", "access_level": "none"},
        )
        assert res.status_code == status.HTTP_200_OK, f"Failed to set access control: {res.json()}"

        from ee.models.rbac.access_control import AccessControl

        ac = AccessControl.objects.filter(team=self.team, resource="activity_log", resource_id=None).first()
        assert ac is not None, "Access control was not created"
        assert ac.access_level == "none", f"Access level is {ac.access_level}, expected 'none'"

        self._org_membership(OrganizationMembership.Level.MEMBER)

        res = self.client.get("/api/projects/@current/activity_log/")
        assert res.status_code == status.HTTP_403_FORBIDDEN, f"Expected 403, got {res.status_code}: {res.json()}"

        res = self.client.get("/api/projects/@current/advanced_activity_logs/")
        assert res.status_code == status.HTTP_403_FORBIDDEN, f"Expected 403, got {res.status_code}: {res.json()}"


class TestAccessControlResourceLevelAPI(BaseAccessControlTest):
    def setUp(self):
        super().setUp()

        self.notebook = Notebook.objects.create(
            team=self.team, created_by=self.user, short_id="0", title="first notebook"
        )

        self.other_user = self._create_user("other_user")
        self.other_user_notebook = Notebook.objects.create(
            team=self.team, created_by=self.other_user, short_id="1", title="first notebook"
        )

    def _get_access_controls(self):
        return self.client.get(f"/api/projects/@current/notebooks/{self.notebook.short_id}/access_controls")

    def _put_access_control(self, data=None, notebook_id=None):
        payload = {
            "access_level": "editor",
        }

        if data:
            payload.update(data)
        return self.client.put(
            f"/api/projects/@current/notebooks/{notebook_id or self.notebook.short_id}/access_controls",
            payload,
        )

    def test_get_access_controls(self):
        self._org_membership(OrganizationMembership.Level.MEMBER)
        res = self._get_access_controls()
        assert res.status_code == status.HTTP_200_OK, res.json()
        assert res.json() == {
            "access_controls": [],
            "available_access_levels": ["none", "viewer", "editor", "manager"],
            "user_access_level": "manager",
            "default_access_level": "editor",
            "user_can_edit_access_levels": True,
            "minimum_access_level": "none",
            "maximum_access_level": "manager",
        }

    def test_change_rejected_if_not_org_admin(self):
        self._org_membership(OrganizationMembership.Level.MEMBER)
        res = self._put_access_control(notebook_id=self.other_user_notebook.short_id)
        assert res.status_code == status.HTTP_403_FORBIDDEN, res.json()

    def test_change_accepted_if_org_admin(self):
        self._org_membership(OrganizationMembership.Level.ADMIN)
        res = self._put_access_control(notebook_id=self.other_user_notebook.short_id)
        assert res.status_code == status.HTTP_200_OK, res.json()

    def test_change_accepted_if_creator_of_the_resource(self):
        self._org_membership(OrganizationMembership.Level.MEMBER)
        res = self._put_access_control(notebook_id=self.notebook.short_id)
        assert res.status_code == status.HTTP_200_OK, res.json()


class TestUsersWithAccessAPI(BaseAccessControlTest):
    """Test the new users_with_access endpoint"""

    def setUp(self):
        super().setUp()

        # Create additional users for testing
        self.user2 = self._create_user("user2@example.com")
        self.user3 = self._create_user("user3@example.com")
        self.user4 = self._create_user("user4@example.com")

        # Create a notebook for testing
        self.notebook = Notebook.objects.create(
            team=self.team, created_by=self.user, short_id="0", title="test notebook"
        )

        # Create a role for testing
        self.role = Role.objects.create(name="Test Role", organization=self.organization)

    def _get_users_with_access(self, notebook_id=None):
        return self.client.get(
            f"/api/projects/@current/notebooks/{notebook_id or self.notebook.short_id}/users_with_access"
        )

    def _put_notebook_access_control(self, notebook_id: str, data=None):
        payload = {
            "access_level": "editor",
        }
        if data:
            payload.update(data)
        return self.client.put(
            f"/api/projects/@current/notebooks/{notebook_id}/access_controls",
            payload,
        )

    def test_default_access_includes_all_org_members(self):
        """Test that by default all organization members have access"""
        self._org_membership(OrganizationMembership.Level.MEMBER)

        res = self._get_users_with_access()
        assert res.status_code == status.HTTP_200_OK, res.json()

        data = res.json()
        assert data["total_count"] == 4  # user, user2, user3, user4
        assert len(data["users"]) == 4
        # Check that all users are included with default access
        user_ids = [user["user_id"] for user in data["users"]]
        assert str(self.user.uuid) in user_ids
        assert str(self.user2.uuid) in user_ids
        assert str(self.user3.uuid) in user_ids
        assert str(self.user4.uuid) in user_ids

        # Check that creator has highest access level
        creator_user = next(user for user in data["users"] if user["user_id"] == str(self.user.uuid))
        assert creator_user["access_level"] == "manager"
        assert creator_user["access_source"] == AccessSource.CREATOR.value

        # Check that other users have default access level (not "none")
        other_users = [user for user in data["users"] if user["user_id"] != str(self.user.uuid)]
        for user in other_users:
            assert user["access_level"] != "none"

    def test_org_admin_has_highest_access(self):
        """Test that org admins get highest access level"""
        self._org_membership(OrganizationMembership.Level.ADMIN)

        # Create a notebook by another user so we can test org admin access
        other_notebook = Notebook.objects.create(
            team=self.team, created_by=self.user2, short_id="2", title="other notebook"
        )

        res = self.client.get(f"/api/projects/@current/notebooks/{other_notebook.short_id}/users_with_access")
        assert res.status_code == status.HTTP_200_OK, res.json()

        data = res.json()
        admin_user = next(user for user in data["users"] if user["user_id"] == str(self.user.uuid))
        assert admin_user["access_level"] == "manager"
        assert admin_user["access_source"] == AccessSource.ORGANIZATION_ADMIN.value

    def test_explicit_access_control_shows_correct_source(self):
        """Test that explicit access controls are properly identified"""
        self._org_membership(OrganizationMembership.Level.ADMIN)

        # Give user2 explicit access
        res = self._put_notebook_access_control(
            self.notebook.short_id,
            {
                "organization_member": str(self.user2.organization_memberships.get(organization=self.organization).id),
                "access_level": "viewer",
            },
        )
        assert res.status_code == status.HTTP_200_OK, res.json()

        res = self._get_users_with_access()
        assert res.status_code == status.HTTP_200_OK, res.json()

        data = res.json()
        user2_data = next(user for user in data["users"] if user["user_id"] == str(self.user2.uuid))
        assert user2_data["access_level"] == "viewer"
        assert user2_data["access_source"] == AccessSource.EXPLICIT_MEMBER.value

    def test_role_based_access_shows_correct_source(self):
        """Test that role-based access is properly identified"""
        self._org_membership(OrganizationMembership.Level.ADMIN)

        # Add user2 to role
        RoleMembership.objects.create(
            user=self.user2,
            role=self.role,
            organization_member=self.user2.organization_memberships.get(organization=self.organization),
        )

        # Give role access to notebook
        res = self._put_notebook_access_control(
            self.notebook.short_id, {"role": str(self.role.id), "access_level": "viewer"}
        )
        assert res.status_code == status.HTTP_200_OK, res.json()

        res = self._get_users_with_access()
        assert res.status_code == status.HTTP_200_OK, res.json()

        data = res.json()
        user2_data = next(user for user in data["users"] if user["user_id"] == str(self.user2.uuid))
        assert user2_data["access_level"] == "viewer"
        assert user2_data["access_source"] == AccessSource.EXPLICIT_ROLE.value

    def test_project_level_access_shows_correct_source(self):
        """Test that project-level access is properly identified"""
        self._org_membership(OrganizationMembership.Level.ADMIN)

        # Give user2 project-level access
        res = self._put_project_access_control(
            {
                "organization_member": str(self.user2.organization_memberships.get(organization=self.organization).id),
                "access_level": "admin",
            }
        )
        assert res.status_code == status.HTTP_200_OK, res.json()

        res = self._get_users_with_access()
        assert res.status_code == status.HTTP_200_OK, res.json()

        data = res.json()
        user2_data = next(user for user in data["users"] if user["user_id"] == str(self.user2.uuid))
        assert user2_data["access_level"] == "editor"
        assert user2_data["access_source"] == AccessSource.PROJECT_ADMIN.value

    def test_no_access_users_excluded(self):
        """Test that users with no access are excluded"""
        self._org_membership(OrganizationMembership.Level.ADMIN)

        # Set notebook to no access by default
        res = self._put_notebook_access_control(self.notebook.short_id, {"access_level": "none"})
        assert res.status_code == status.HTTP_200_OK, res.json()

        res = self._get_users_with_access()
        assert res.status_code == status.HTTP_200_OK, res.json()

        data = res.json()
        # Only creator should have access (others are excluded due to "none" access level)
        assert data["total_count"] == 1  # Only creator has access
        creator_user = next(user for user in data["users"] if user["user_id"] == str(self.user.uuid))
        assert creator_user["access_level"] == "manager"
        assert creator_user["access_source"] == AccessSource.CREATOR.value

        # Other users should be excluded entirely
        other_user_ids = [str(self.user2.uuid), str(self.user3.uuid), str(self.user4.uuid)]
        for user_id in other_user_ids:
            assert not any(user["user_id"] == user_id for user in data["users"])

    def test_access_level_prioritization(self):
        """Test that higher access levels take precedence"""
        self._org_membership(OrganizationMembership.Level.ADMIN)

        # Give user2 explicit viewer access
        res = self._put_notebook_access_control(
            self.notebook.short_id,
            {
                "organization_member": str(self.user2.organization_memberships.get(organization=self.organization).id),
                "access_level": "viewer",
            },
        )
        assert res.status_code == status.HTTP_200_OK, res.json()

        # Make user2 org admin (should override explicit access)
        user2_membership = self.user2.organization_memberships.get(organization=self.organization)
        user2_membership.level = OrganizationMembership.Level.ADMIN
        user2_membership.save()

        res = self._get_users_with_access()
        assert res.status_code == status.HTTP_200_OK, res.json()

        data = res.json()
        user2_data = next(user for user in data["users"] if user["user_id"] == str(self.user2.uuid))
        assert user2_data["access_level"] == "manager"
        assert user2_data["access_source"] == AccessSource.ORGANIZATION_ADMIN.value

    def test_users_sorted_by_access_level_then_email(self):
        """Test that users are sorted by access level (highest first) then by email"""
        self._org_membership(OrganizationMembership.Level.ADMIN)

        # Give different access levels to different users
        res = self._put_notebook_access_control(
            self.notebook.short_id,
            {
                "organization_member": str(self.user2.organization_memberships.get(organization=self.organization).id),
                "access_level": "viewer",
            },
        )
        assert res.status_code == status.HTTP_200_OK, res.json()

        res = self._put_notebook_access_control(
            self.notebook.short_id,
            {
                "organization_member": str(self.user3.organization_memberships.get(organization=self.organization).id),
                "access_level": "editor",
            },
        )
        assert res.status_code == status.HTTP_200_OK, res.json()

        res = self._get_users_with_access()
        assert res.status_code == status.HTTP_200_OK, res.json()

        data = res.json()
        # Should be sorted: manager (creator), editor (user3), editor (user4 default), viewer (user2)
        assert data["users"][0]["access_level"] == "manager"  # creator
        assert data["users"][1]["access_level"] == "editor"  # user3
        assert data["users"][2]["access_level"] == "editor"  # user4 (default)
        assert data["users"][3]["access_level"] == "viewer"  # user2

    def test_endpoint_requires_permission(self):
        """Test that the endpoint requires appropriate permissions"""
        # Set project-level access to "none" as admin first
        self._org_membership(OrganizationMembership.Level.ADMIN)
        res = self._put_project_access_control({"access_level": "none"})
        assert res.status_code == status.HTTP_200_OK, res.json()

        # Switch to member level
        self._org_membership(OrganizationMembership.Level.MEMBER)

        # Try to access another user's notebook
        other_notebook = Notebook.objects.create(
            team=self.team, created_by=self.user2, short_id="1", title="other notebook"
        )

        res = self._get_users_with_access(other_notebook.short_id)
        assert res.status_code == status.HTTP_403_FORBIDDEN

    def test_endpoint_returns_correct_user_data(self):
        """Test that the endpoint returns all required user data fields"""
        self._org_membership(OrganizationMembership.Level.MEMBER)

        res = self._get_users_with_access()
        assert res.status_code == status.HTTP_200_OK, res.json()

        data = res.json()
        user_data = data["users"][0]  # First user

        # Check all required fields are present
        assert "user_id" in user_data
        assert "access_level" in user_data
        assert "access_source" in user_data
        assert "organization_membership_id" in user_data
        assert "organization_membership_level" in user_data

        # Check data types
        assert isinstance(user_data["user_id"], str)
        assert isinstance(user_data["access_level"], str)
        assert isinstance(user_data["access_source"], str)

    def test_endpoint_works_with_different_resource_types(self):
        """Test that the endpoint works with different resource types (notebooks, dashboards, etc.)"""
        self._org_membership(OrganizationMembership.Level.MEMBER)

        # Test with dashboard
        dashboard = Dashboard.objects.create(team=self.team, created_by=self.user, name="test dashboard")

        res = self.client.get(f"/api/projects/@current/dashboards/{dashboard.id}/users_with_access")
        assert res.status_code == status.HTTP_200_OK, res.json()

        data = res.json()
        assert data["total_count"] >= 1
        assert any(user["user_id"] == str(self.user.uuid) for user in data["users"])

    def test_endpoint_handles_empty_organization(self):
        """Test that the endpoint handles organizations with no members gracefully"""
        self._org_membership(OrganizationMembership.Level.MEMBER)

        # Remove all other users from organization
        OrganizationMembership.objects.filter(organization=self.organization).exclude(user=self.user).delete()

        res = self._get_users_with_access()
        assert res.status_code == status.HTTP_200_OK, res.json()

        data = res.json()
        assert data["total_count"] == 1
        assert data["users"][0]["user_id"] == str(self.user.uuid)

    def test_project_level_none_access_excludes_users(self):
        """Test that when project-level access is set to 'none', users without project access are excluded from the list"""
        self._org_membership(OrganizationMembership.Level.ADMIN)

        # Set project-level access to "none"
        res = self._put_project_access_control({"access_level": "none"})
        assert res.status_code == status.HTTP_200_OK, res.json()

        # Give user2 explicit project access so they should still appear
        res = self._put_project_access_control(
            {
                "organization_member": str(self.user2.organization_memberships.get(organization=self.organization).id),
                "access_level": "member",
            }
        )
        assert res.status_code == status.HTTP_200_OK, res.json()

        res = self._get_users_with_access()
        assert res.status_code == status.HTTP_200_OK, res.json()

        data = res.json()
        # Creator and user2 should have access, others should be excluded due to project-level "none" access
        assert data["total_count"] == 2
        user_ids = [user["user_id"] for user in data["users"]]
        assert str(self.user.uuid) in user_ids  # creator
        assert str(self.user2.uuid) in user_ids  # explicit project access
        assert str(self.user3.uuid) not in user_ids  # no project access
        assert str(self.user4.uuid) not in user_ids  # no project access

        # Check that creator has highest access level
        creator_user = next(user for user in data["users"] if user["user_id"] == str(self.user.uuid))
        assert creator_user["access_level"] == "manager"
        assert creator_user["access_source"] == AccessSource.CREATOR.value

        # Check that user2 has project-level access
        user2_data = next(user for user in data["users"] if user["user_id"] == str(self.user2.uuid))
        assert user2_data["access_level"] == "editor"  # default resource access level
        assert user2_data["access_source"] == AccessSource.PROJECT_ADMIN.value

    def test_only_active_users_included(self):
        """Test that only active users are included in the users_with_access endpoint"""
        self._org_membership(OrganizationMembership.Level.ADMIN)

        # Create an inactive user and add them to the organization
        inactive_user = self._create_user("inactive_user@example.com")
        inactive_user.is_active = False
        inactive_user.save()

        # Get users with access
        res = self._get_users_with_access()
        assert res.status_code == status.HTTP_200_OK, res.json()

        data = res.json()
        user_ids = [user["user_id"] for user in data["users"]]

        # Verify inactive user is not included
        assert str(inactive_user.uuid) not in user_ids

        # Verify active users are still included
        assert str(self.user.uuid) in user_ids
        assert str(self.user2.uuid) in user_ids
        assert str(self.user3.uuid) in user_ids
        assert str(self.user4.uuid) in user_ids


class TestGlobalAccessControlsPermissions(BaseAccessControlTest):
    def setUp(self):
        super().setUp()

        self.role = Role.objects.create(name="Engineers", organization=self.organization)
        self.role_membership = RoleMembership.objects.create(user=self.user, role=self.role)

    def test_admin_can_always_access(self):
        self._org_membership(OrganizationMembership.Level.ADMIN)
        assert (
            self._put_global_access_control({"resource": "feature_flag", "access_level": "none"}).status_code
            == status.HTTP_200_OK
        )
        assert self.client.get("/api/projects/@current/feature_flags").status_code == status.HTTP_200_OK

    def test_forbidden_access_if_resource_wide_control_in_place(self):
        self._org_membership(OrganizationMembership.Level.ADMIN)
        assert (
            self._put_global_access_control({"resource": "feature_flag", "access_level": "none"}).status_code
            == status.HTTP_200_OK
        )
        self._org_membership(OrganizationMembership.Level.MEMBER)

        assert self.client.get("/api/projects/@current/feature_flags").status_code == status.HTTP_403_FORBIDDEN
        assert self.client.post("/api/projects/@current/feature_flags").status_code == status.HTTP_403_FORBIDDEN

    def test_forbidden_write_access_if_resource_wide_control_in_place(self):
        self._org_membership(OrganizationMembership.Level.ADMIN)
        assert (
            self._put_global_access_control({"resource": "feature_flag", "access_level": "viewer"}).status_code
            == status.HTTP_200_OK
        )
        self._org_membership(OrganizationMembership.Level.MEMBER)

        assert self.client.get("/api/projects/@current/feature_flags").status_code == status.HTTP_200_OK
        assert self.client.post("/api/projects/@current/feature_flags").status_code == status.HTTP_403_FORBIDDEN

    def test_access_granted_with_granted_role(self):
        self._org_membership(OrganizationMembership.Level.ADMIN)
        assert (
            self._put_global_access_control({"resource": "feature_flag", "access_level": "none"}).status_code
            == status.HTTP_200_OK
        )
        assert (
            self._put_global_access_control(
                {"resource": "feature_flag", "access_level": "viewer", "role": self.role.id}
            ).status_code
            == status.HTTP_200_OK
        )
        self._org_membership(OrganizationMembership.Level.MEMBER)

        assert self.client.get("/api/projects/@current/feature_flags").status_code == status.HTTP_200_OK
        assert self.client.post("/api/projects/@current/feature_flags").status_code == status.HTTP_403_FORBIDDEN

        self.role_membership.delete()
        assert self.client.get("/api/projects/@current/feature_flags").status_code == status.HTTP_403_FORBIDDEN


class TestAccessControlPermissions(BaseAccessControlTest):
    """
    Test actual permissions being applied for a resource (notebooks as an example)
    """

    def setUp(self):
        super().setUp()
        self.other_user = self._create_user("other_user")

        self.other_user_notebook = Notebook.objects.create(
            team=self.team, created_by=self.other_user, title="not my notebook"
        )

        self.notebook = Notebook.objects.create(team=self.team, created_by=self.user, title="my notebook")

    def _post_notebook(self):
        return self.client.post("/api/projects/@current/notebooks/", {"title": "notebook"})

    def _patch_notebook(self, id: str):
        return self.client.patch(f"/api/projects/@current/notebooks/{id}", {"title": "new-title"})

    def _get_notebook(self, id: str):
        return self.client.get(f"/api/projects/@current/notebooks/{id}")

    def _put_notebook_access_control(self, notebook_id: str, data=None):
        payload = {
            "access_level": "editor",
        }

        if data:
            payload.update(data)
        return self.client.put(
            f"/api/projects/@current/notebooks/{notebook_id}/access_controls",
            payload,
        )

    def test_default_allows_all_access(self):
        self._org_membership(OrganizationMembership.Level.MEMBER)
        assert self._get_notebook(self.other_user_notebook.short_id).status_code == status.HTTP_200_OK
        assert self._patch_notebook(id=self.other_user_notebook.short_id).status_code == status.HTTP_200_OK
        res = self._post_notebook()
        assert res.status_code == status.HTTP_201_CREATED
        assert self._patch_notebook(id=res.json()["short_id"]).status_code == status.HTTP_200_OK

    def test_rejects_all_access_without_project_access(self):
        self._org_membership(OrganizationMembership.Level.ADMIN)
        assert self._put_project_access_control({"access_level": "none"}).status_code == status.HTTP_200_OK
        self._org_membership(OrganizationMembership.Level.MEMBER)

        assert self._get_notebook(self.other_user_notebook.short_id).status_code == status.HTTP_403_FORBIDDEN
        assert self._patch_notebook(id=self.other_user_notebook.short_id).status_code == status.HTTP_403_FORBIDDEN
        assert self._post_notebook().status_code == status.HTTP_403_FORBIDDEN

    def test_permits_access_with_member_control(self):
        self._org_membership(OrganizationMembership.Level.ADMIN)
        assert self._put_project_access_control({"access_level": "none"}).status_code == status.HTTP_200_OK
        assert (
            self._put_project_access_control(
                {"access_level": "member", "organization_member": str(self.organization_membership.id)}
            ).status_code
            == status.HTTP_200_OK
        )
        self._org_membership(OrganizationMembership.Level.MEMBER)

        assert self._get_notebook(self.other_user_notebook.short_id).status_code == status.HTTP_200_OK
        assert self._patch_notebook(id=self.other_user_notebook.short_id).status_code == status.HTTP_200_OK
        assert self._post_notebook().status_code == status.HTTP_201_CREATED

    def test_rejects_edit_access_with_resource_control(self):
        self._org_membership(OrganizationMembership.Level.ADMIN)
        # Set other notebook to only allow view access by default
        assert (
            self._put_notebook_access_control(self.other_user_notebook.short_id, {"access_level": "viewer"}).status_code
            == status.HTTP_200_OK
        )
        self._org_membership(OrganizationMembership.Level.MEMBER)

        assert self._get_notebook(self.other_user_notebook.short_id).status_code == status.HTTP_200_OK
        assert self._patch_notebook(id=self.other_user_notebook.short_id).status_code == status.HTTP_403_FORBIDDEN
        assert self._post_notebook().status_code == status.HTTP_201_CREATED

    def test_rejects_view_access_if_not_creator(self):
        self._org_membership(OrganizationMembership.Level.ADMIN)
        # Set other notebook to only allow view access by default
        assert (
            self._put_notebook_access_control(self.other_user_notebook.short_id, {"access_level": "none"}).status_code
            == status.HTTP_200_OK
        )
        assert (
            self._put_notebook_access_control(self.notebook.short_id, {"access_level": "none"}).status_code
            == status.HTTP_200_OK
        )
        self._org_membership(OrganizationMembership.Level.MEMBER)

        # Access to other notebook is denied
        assert self._get_notebook(self.other_user_notebook.short_id).status_code == status.HTTP_403_FORBIDDEN
        assert self._patch_notebook(id=self.other_user_notebook.short_id).status_code == status.HTTP_403_FORBIDDEN
        # As creator, access to my notebook is still permitted
        assert self._get_notebook(self.notebook.short_id).status_code == status.HTTP_200_OK
        assert self._patch_notebook(id=self.notebook.short_id).status_code == status.HTTP_200_OK

    def test_org_level_endpoints_work(self):
        assert self.client.get("/api/organizations/@current/plugins").status_code == status.HTTP_200_OK


class TestAccessControlQueryCounts(BaseAccessControlTest):
    def setUp(self):
        super().setUp()
        self.other_user = self._create_user("other_user")

        self.other_user_notebook = Notebook.objects.create(
            team=self.team, created_by=self.other_user, title="not my notebook"
        )

        self.notebook = Notebook.objects.create(team=self.team, created_by=self.user, title="my notebook")

        # Baseline call to trigger caching of one off things like instance settings
        self.client.get(f"/api/projects/@current/notebooks/{self.notebook.short_id}")

    def test_query_counts(self):
        self._org_membership(OrganizationMembership.Level.MEMBER)
        my_dashboard = Dashboard.objects.create(team=self.team, created_by=self.user, name="my dashboard")
        other_user_dashboard = Dashboard.objects.create(
            team=self.team, created_by=self.other_user, name="other user dashboard"
        )

        # Baseline query (triggers any first time cache things)
        self.client.get(f"/api/projects/@current/notebooks/{self.notebook.short_id}")
        baseline = 18

        # Access controls total 2 extra queries - 1 for org membership, 1 for the user roles, 1 for the preloaded access controls
        with self.assertNumQueries(baseline + 4):
            self.client.get(f"/api/projects/@current/dashboards/{my_dashboard.id}?no_items_field=true")

        # Accessing a different users dashboard doesn't +1 as the preload works using the pk
        with self.assertNumQueries(baseline + 4):
            self.client.get(f"/api/projects/@current/dashboards/{other_user_dashboard.id}?no_items_field=true")

        baseline = 8
        # Getting my own notebook is the same as a dashboard - 3 extra queries
        with self.assertNumQueries(baseline + 5):
            self.client.get(f"/api/projects/@current/notebooks/{self.notebook.short_id}")

        # Except when accessing a different notebook where we _also_ need to check as we are not the creator and the pk is not the same (short_id)
        with self.assertNumQueries(baseline + 6):
            self.client.get(f"/api/projects/@current/notebooks/{self.other_user_notebook.short_id}")

        baseline = 8
        # Project access doesn't double query the object
        with self.assertNumQueries(baseline + 7):
            # We call this endpoint as we don't want to include all the extra queries that rendering the project uses
            self.client.get("/api/projects/@current/is_generating_demo_data")

        # When accessing the list of notebooks we have extra queries due to checking for role based access and filtering out items
        baseline = 9
        with self.assertNumQueries(baseline + 6):  # org, roles, preloaded access controls
            self.client.get("/api/projects/@current/notebooks/")

    def test_query_counts_with_preload_optimization(self):
        self._org_membership(OrganizationMembership.Level.MEMBER)
        my_dashboard = Dashboard.objects.create(team=self.team, created_by=self.user, name="my dashboard")
        other_user_dashboard = Dashboard.objects.create(
            team=self.team, created_by=self.other_user, name="other user dashboard"
        )

        # Baseline query (triggers any first time cache things)
        self.client.get(f"/api/projects/@current/notebooks/{self.notebook.short_id}")
        baseline = 17

        # Access controls total 2 extra queries - 1 for org membership, 1 for the user roles, 1 for the preloaded access controls
        with self.assertNumQueries(baseline + 5):
            self.client.get(f"/api/projects/@current/dashboards/{my_dashboard.id}?no_items_field=true")

        # Accessing a different users dashboard doesn't +1 as the preload works using the pk
        with self.assertNumQueries(baseline + 5):
            self.client.get(f"/api/projects/@current/dashboards/{other_user_dashboard.id}?no_items_field=true")

    def test_query_counts_only_adds_1_for_non_pk_resources(self):
        self._org_membership(OrganizationMembership.Level.MEMBER)
        # Baseline query (triggers any first time cache things)
        self.client.get(f"/api/projects/@current/notebooks/{self.notebook.short_id}")
        baseline = 8

        # Getting my own notebook is the same as a dashboard - 3 extra queries
        with self.assertNumQueries(baseline + 5):
            self.client.get(f"/api/projects/@current/notebooks/{self.notebook.short_id}")

        # Except when accessing a different notebook where we _also_ need to check as we are not the creator and the pk is not the same (short_id)
        with self.assertNumQueries(baseline + 6):
            self.client.get(f"/api/projects/@current/notebooks/{self.other_user_notebook.short_id}")

    def test_query_counts_stable_for_project_access(self):
        self._org_membership(OrganizationMembership.Level.MEMBER)

        baseline = 8
        # Project access doesn't double query the object
        with self.assertNumQueries(baseline + 7):
            # We call this endpoint as we don't want to include all the extra queries that rendering the project uses
            self.client.get("/api/projects/@current/is_generating_demo_data")

        # When accessing the list of notebooks we have extra queries due to checking for role based access and filtering out items
        baseline = 9
        with self.assertNumQueries(baseline + 6):  # org, roles, preloaded access controls
            self.client.get("/api/projects/@current/notebooks/")

    def test_query_counts_stable_when_listing_resources(self):
        # When accessing the list of notebooks we have extra queries due to checking for role based access and filtering out items
        baseline = 9

        with self.assertNumQueries(baseline + 6):  # org, roles, preloaded access controls
            self.client.get("/api/projects/@current/notebooks/")

    def test_query_counts_stable_when_listing_resources_including_access_control_info(self):
        for i in range(10):
            FeatureFlag.objects.create(team=self.team, created_by=self.other_user, key=f"flag-{i}")

        baseline = 16  # This is a lot! There is currently an n+1 issue with the legacy access control system

        with self.assertNumQueries(baseline + 7):  # org, roles, preloaded permissions acs, preloaded acs for the list
            self.client.get("/api/projects/@current/feature_flags/")

        for i in range(10):
            FeatureFlag.objects.create(team=self.team, created_by=self.other_user, key=f"flag-{10 + i}")

        with self.assertNumQueries(baseline + 7):  # org, roles, preloaded permissions acs, preloaded acs for the list
            self.client.get("/api/projects/@current/feature_flags/")


class TestAccessControlFiltering(BaseAccessControlTest):
    def setUp(self):
        super().setUp()
        self.other_user = self._create_user("other_user")

        self.other_user_notebook = Notebook.objects.create(
            team=self.team, created_by=self.other_user, title="not my notebook"
        )

        self.notebook = Notebook.objects.create(team=self.team, created_by=self.user, title="my notebook")

    def _put_notebook_access_control(self, notebook_id: str, data=None):
        payload = {
            "access_level": "editor",
        }

        if data:
            payload.update(data)
        return self.client.put(
            f"/api/projects/@current/notebooks/{notebook_id}/access_controls",
            payload,
        )

    def _get_notebooks(self):
        return self.client.get("/api/projects/@current/notebooks/")

    def test_default_allows_all_access(self):
        self._org_membership(OrganizationMembership.Level.MEMBER)
        assert len(self._get_notebooks().json()["results"]) == 2

    def test_does_not_list_notebooks_without_access(self):
        self._org_membership(OrganizationMembership.Level.ADMIN)
        assert (
            self._put_notebook_access_control(self.other_user_notebook.short_id, {"access_level": "none"}).status_code
            == status.HTTP_200_OK
        )
        assert (
            self._put_notebook_access_control(self.notebook.short_id, {"access_level": "none"}).status_code
            == status.HTTP_200_OK
        )
        self._org_membership(OrganizationMembership.Level.MEMBER)

        res = self._get_notebooks()
        assert len(res.json()["results"]) == 1
        assert res.json()["results"][0]["id"] == str(self.notebook.id)

    def test_list_notebooks_with_explicit_access(self):
        self._org_membership(OrganizationMembership.Level.ADMIN)
        assert (
            self._put_notebook_access_control(self.other_user_notebook.short_id, {"access_level": "none"}).status_code
            == status.HTTP_200_OK
        )
        assert (
            self._put_notebook_access_control(
                self.other_user_notebook.short_id,
                {"organization_member": str(self.organization_membership.id), "access_level": "viewer"},
            ).status_code
            == status.HTTP_200_OK
        )
        self._org_membership(OrganizationMembership.Level.MEMBER)

        res = self._get_notebooks()
        assert len(res.json()["results"]) == 2

    def test_search_results_exclude_restricted_objects(self):
        res = self.client.get("/api/projects/@current/search?q=my notebook")
        assert len(res.json()["results"]) == 2

        self._org_membership(OrganizationMembership.Level.ADMIN)
        assert (
            self._put_notebook_access_control(self.other_user_notebook.short_id, {"access_level": "none"}).status_code
            == status.HTTP_200_OK
        )

        self._org_membership(OrganizationMembership.Level.MEMBER)

        res = self.client.get("/api/projects/@current/search?q=my notebook")
        assert len(res.json()["results"]) == 1


class TestAccessControlProjectFiltering(BaseAccessControlTest):
    """
    Projects are listed in multiple places and ways so we need to test all of them here
    """

    def setUp(self):
        super().setUp()
        self.other_team = Team.objects.create(organization=self.organization, name="other team")
        self.other_team_2 = Team.objects.create(organization=self.organization, name="other team 2")

    def _put_project_access_control_as_admin(self, team_id: int, data=None):
        self._org_membership(OrganizationMembership.Level.ADMIN)
        payload = {
            "access_level": "editor",
        }

        if data:
            payload.update(data)
        res = self.client.put(
            f"/api/projects/{team_id}/access_controls",
            payload,
        )

        self._org_membership(OrganizationMembership.Level.MEMBER)

        assert res.status_code == status.HTTP_200_OK, res.json()
        return res

    def _get_posthog_app_context(self):
        mock_template = MagicMock()
        with patch("posthog.utils.get_template", return_value=mock_template):
            mock_request = MagicMock()
            mock_request.user = self.user
            mock_request.GET = {}
            render_template("index.html", request=mock_request, context={})

            # Get the context passed to the template
            return json.loads(mock_template.render.call_args[0][0]["posthog_app_context"])

    def test_default_lists_all_projects(self):
        assert len(self.client.get("/api/projects").json()["results"]) == 3
        me_response = self.client.get("/api/users/@me").json()
        assert len(me_response["organization"]["teams"]) == 3

    def test_does_not_list_projects_without_access(self):
        self._put_project_access_control_as_admin(self.other_team.id, {"access_level": "none"})
        assert len(self.client.get("/api/projects").json()["results"]) == 2
        me_response = self.client.get("/api/users/@me").json()
        assert len(me_response["organization"]["teams"]) == 2

    def test_always_lists_all_projects_if_org_admin(self):
        self._put_project_access_control_as_admin(self.other_team.id, {"access_level": "none"})
        self._org_membership(OrganizationMembership.Level.ADMIN)
        assert len(self.client.get("/api/projects").json()["results"]) == 3
        me_response = self.client.get("/api/users/@me").json()
        assert len(me_response["organization"]["teams"]) == 3

    def test_template_render_filters_teams(self):
        app_context = self._get_posthog_app_context()
        assert len(app_context["current_user"]["organization"]["teams"]) == 3
        assert app_context["current_team"]["id"] == self.team.id
        assert app_context["current_team"]["user_access_level"] == "admin"

        self._put_project_access_control_as_admin(self.team.id, {"access_level": "none"})
        app_context = self._get_posthog_app_context()
        assert len(app_context["current_user"]["organization"]["teams"]) == 2
        assert app_context["current_team"]["id"] == self.team.id
        assert app_context["current_team"]["user_access_level"] == "none"


# TODO: Add tests to check that a dashboard can't be edited if the user doesn't have access


class TestAccessControlScopeRequirements(BaseAccessControlTest):
    """
    Test that access control endpoints require the correct scopes
    """

    def setUp(self):
        super().setUp()
        self._org_membership(OrganizationMembership.Level.ADMIN)

    def test_access_controls_get_requires_access_control_read_scope(self):
        """Test that GET requests to access_controls endpoint require access_control:read scope"""
        key_value = generate_random_token_personal()
        PersonalAPIKey.objects.create(
            user=self.user,
            label="test_key",
            secure_value=hash_key_value(key_value),
            scopes=["project:read"],  # Only project:read, no access_control:read
        )

        response = self.client.get(
            "/api/projects/@current/access_controls", headers={"authorization": f"Bearer {key_value}"}
        )
        assert response.status_code == status.HTTP_403_FORBIDDEN
        assert "access_control:read" in response.json()["detail"]

    def test_resource_access_controls_get_requires_access_control_read_scope(self):
        """Test that GET requests to resource_access_controls endpoint require access_control:read scope"""
        key_value = generate_random_token_personal()
        PersonalAPIKey.objects.create(
            user=self.user,
            label="test_key",
            secure_value=hash_key_value(key_value),
            scopes=["project:read"],  # Only project:read, no access_control:read
        )

        response = self.client.get(
            "/api/projects/@current/resource_access_controls", headers={"authorization": f"Bearer {key_value}"}
        )
        assert response.status_code == status.HTTP_403_FORBIDDEN
        assert "access_control:read" in response.json()["detail"]

    def test_deprecated_global_access_controls_get_requires_access_control_read_scope(self):
        """Test that GET requests to deprecated global_access_controls endpoint require access_control:read scope"""
        key_value = generate_random_token_personal()
        PersonalAPIKey.objects.create(
            user=self.user,
            label="test_key",
            secure_value=hash_key_value(key_value),
            scopes=["project:read"],  # Only project:read, no access_control:read
        )

        response = self.client.get(
            "/api/projects/@current/global_access_controls", headers={"authorization": f"Bearer {key_value}"}
        )
        assert response.status_code == status.HTTP_403_FORBIDDEN
        assert "access_control:read" in response.json()["detail"]

    def test_access_controls_get_succeeds_with_access_control_read_scope(self):
        """Test that GET requests to access_controls endpoint succeed with access_control:read scope"""
        key_value = generate_random_token_personal()
        PersonalAPIKey.objects.create(
            user=self.user, label="test_key", secure_value=hash_key_value(key_value), scopes=["access_control:read"]
        )

        response = self.client.get(
            "/api/projects/@current/access_controls", headers={"authorization": f"Bearer {key_value}"}
        )
        assert response.status_code == status.HTTP_200_OK

    def test_resource_access_controls_get_succeeds_with_access_control_read_scope(self):
        """Test that GET requests to resource_access_controls endpoint succeed with access_control:read scope"""
        key_value = generate_random_token_personal()
        PersonalAPIKey.objects.create(
            user=self.user, label="test_key", secure_value=hash_key_value(key_value), scopes=["access_control:read"]
        )

        response = self.client.get(
            "/api/projects/@current/resource_access_controls", headers={"authorization": f"Bearer {key_value}"}
        )
        assert response.status_code == status.HTTP_200_OK

    def test_notebook_access_controls_get_requires_access_control_read_scope(self):
        """Test that GET requests to notebook access_controls endpoint require access_control:read scope"""
        notebook = Notebook.objects.create(
            team=self.team, created_by=self.user, short_id="test-scope", title="test notebook"
        )

        key_value = generate_random_token_personal()
        PersonalAPIKey.objects.create(
            user=self.user,
            label="test_key",
            secure_value=hash_key_value(key_value),
            scopes=["project:read"],  # Only project:read, no access_control:read
        )

        response = self.client.get(
            f"/api/projects/@current/notebooks/{notebook.short_id}/access_controls",
            headers={"authorization": f"Bearer {key_value}"},
        )
        assert response.status_code == status.HTTP_403_FORBIDDEN
        assert "access_control:read" in response.json()["detail"]

    def test_notebook_access_controls_get_succeeds_with_access_control_read_scope(self):
        """Test that GET requests to notebook access_controls endpoint succeed with access_control:read scope"""
        notebook = Notebook.objects.create(
            team=self.team, created_by=self.user, short_id="test-scope", title="test notebook"
        )

        key_value = generate_random_token_personal()
        PersonalAPIKey.objects.create(
            user=self.user, label="test_key", secure_value=hash_key_value(key_value), scopes=["access_control:read"]
        )

        response = self.client.get(
            f"/api/projects/@current/notebooks/{notebook.short_id}/access_controls",
            headers={"authorization": f"Bearer {key_value}"},
        )
        assert response.status_code == status.HTTP_200_OK

    def test_notebook_access_controls_put_fails_with_only_read_scope(self):
        """Test that PUT requests to notebook access_controls endpoint fail with only access_control:read scope"""
        notebook = Notebook.objects.create(
            team=self.team, created_by=self.user, short_id="test-scope", title="test notebook"
        )

        key_value = generate_random_token_personal()
        PersonalAPIKey.objects.create(
            user=self.user,
            label="test_key",
            secure_value=hash_key_value(key_value),
            scopes=["access_control:read"],  # Only read scope, no write permissions
        )

        response = self.client.put(
            f"/api/projects/@current/notebooks/{notebook.short_id}/access_controls",
            {"organization_member": str(self.organization_membership.id), "access_level": "viewer"},
            headers={"authorization": f"Bearer {key_value}"},
        )
        assert response.status_code == status.HTTP_403_FORBIDDEN
        assert "access_control:write" in response.json()["detail"]

    def test_notebook_access_controls_put_succeeds_with_write_scope(self):
        """Test that PUT requests to notebook access_controls endpoint succeed with access_control:write scope"""
        notebook = Notebook.objects.create(
            team=self.team, created_by=self.user, short_id="test-scope", title="test notebook"
        )

        key_value = generate_random_token_personal()
        PersonalAPIKey.objects.create(
            user=self.user,
            label="test_key_write",
            secure_value=hash_key_value(key_value),
            scopes=["access_control:write"],  # Write scope required for PUT
        )

        response = self.client.put(
            f"/api/projects/@current/notebooks/{notebook.short_id}/access_controls",
            {"organization_member": str(self.organization_membership.id), "access_level": "viewer"},
            headers={"authorization": f"Bearer {key_value}"},
        )
        assert response.status_code == status.HTTP_200_OK

    def test_project_access_controls_put_fails_with_only_read_scope(self):
        """Test that PUT requests to project access_controls endpoint fail with only access_control:read scope"""
        key_value = generate_random_token_personal()
        PersonalAPIKey.objects.create(
            user=self.user,
            label="test_key_project_read",
            secure_value=hash_key_value(key_value),
            scopes=["access_control:read"],  # Only read scope, no write permissions
        )

        response = self.client.put(
            f"/api/projects/@current/access_controls",
            {"access_level": "editor"},
            headers={"authorization": f"Bearer {key_value}"},
        )
        assert response.status_code == status.HTTP_403_FORBIDDEN
        assert "access_control:write" in response.json()["detail"]

    def test_project_access_controls_put_succeeds_with_write_scope(self):
        """Test that PUT requests to project access_controls endpoint succeed with access_control:write scope"""
        key_value = generate_random_token_personal()
        PersonalAPIKey.objects.create(
            user=self.user,
            label="test_key_project_write",
            secure_value=hash_key_value(key_value),
            scopes=["access_control:write"],  # Write scope required for PUT
        )

        response = self.client.put(
            f"/api/projects/@current/access_controls",
            {"access_level": "admin", "resource": "project", "resource_id": str(self.team.id)},
            headers={"authorization": f"Bearer {key_value}"},
        )
        assert response.status_code == status.HTTP_200_OK

    def test_resource_access_controls_put_fails_with_only_read_scope(self):
        """Test that PUT requests to resource_access_controls endpoint fail with only access_control:read scope"""
        key_value = generate_random_token_personal()
        PersonalAPIKey.objects.create(
            user=self.user,
            label="test_key_global_read",
            secure_value=hash_key_value(key_value),
            scopes=["access_control:read"],  # Only read scope, no write permissions
        )

        response = self.client.put(
            f"/api/projects/@current/resource_access_controls",
            {"access_level": "editor", "resource": "notebook"},
            headers={"authorization": f"Bearer {key_value}"},
        )
        assert response.status_code == status.HTTP_403_FORBIDDEN
        assert "access_control:write" in response.json()["detail"]

    def test_resource_access_controls_put_succeeds_with_write_scope(self):
        """Test that PUT requests to resource_access_controls endpoint succeed with access_control:write scope"""
        key_value = generate_random_token_personal()
        PersonalAPIKey.objects.create(
            user=self.user,
            label="test_key_global_write",
            secure_value=hash_key_value(key_value),
            scopes=["access_control:write"],  # Write scope required for PUT
        )

        response = self.client.put(
            f"/api/projects/@current/resource_access_controls",
            {"access_level": "editor", "resource": "dashboard"},
            headers={"authorization": f"Bearer {key_value}"},
        )
        assert response.status_code == status.HTTP_200_OK
